AbstractNode proporciona la funcionalidad base para nodos en el sistema.
type AbstractNode struct {
Name string
Address string
Timeout time.Duration
NodeAddresses map[string]string
Context *zmq.Context
Socket *zmq.Socket
Logger *log.Logger
Handler // Composición de la interfaz Handler
}
func InitializeNodeWithAddresses(name, address string, timeout time.Duration, addresses map[string]string) (*AbstractNode, error)
InitializeNodeWithAddresses inicializa un nodo con direcciones de otros nodos.
func NewAbstractNode(name, address string, timeout time.Duration) (*AbstractNode, error)
NewAbstractNode crea e inicializa un nuevo nodo base.
func (n *AbstractNode) Close() error
Close cierra los recursos del nodo.
func (n *AbstractNode) SendMessageAsync(address, message string) error
SendMessageAsync envía un mensaje de manera asíncrona.
func (n *AbstractNode) SendMessageSync(address string, message string) (string, error)
SendMessageSync envía un mensaje de forma síncrona y espera una respuesta.
func (n *AbstractNode) StartListening() error
StartListening inicia el proceso de escucha para mensajes entrantes en el nodo. Configura un socket de tipo REP (Response) para recibir y responder a mensajes.
type Config struct {
Leader LeaderConfig `json:"leader"`
Followers []FollowerConfig `json:"followers"`
Timeout time.Duration `json:"timeout"`
}
func LoadConfig(filepath string) *Config
type DeltaRequest struct {
Message string `json:"message"`
Operation string `json:"operation"`
Delta int64 `json:"delta"`
LeaderAddr string `json:"leader_address"`
}
ErrorType enum para los diferentes tipos de errores posibles en ZeroMQ.
type ErrorType int
const (
// CONNECTION_ERROR Error relacionado con la conexión de ZeroMQ
CONNECTION_ERROR ErrorType = iota
// SEND_ERROR Error al enviar un mensaje a través del socket
SEND_ERROR
// RECEIVE_ERROR Error al recibir un mensaje del socket
RECEIVE_ERROR
)
Follower representa un nodo seguidor en el sistema distribuido.
type Follower struct {
LeaderAddress string
// contains filtered or unexported fields
}
func InitializeFollowerNode(name, address, leaderAddress string, timeout time.Duration) (*Follower, error)
InitializeNode inicializa el nodo seguidor con su información específica.
func NewFollower(name, address, leaderAddress string, timeout time.Duration) (*Follower, error)
NewFollower crea e inicializa un nuevo nodo seguidor.
func (f *Follower) HandleProcess(message string) (string, error)
HandleProcess maneja y procesa los mensajes recibidos del líder. Implementación de HandleProcess para Follower
func (f *Follower) StartAlgorithm() error
StartAlgorithm configura e inicia el socket REP para escuchar mensajes entrantes y delega la responsabilidad de iniciar la escucha al nodo abstracto. StartAlgorithm configura e inicia la escucha en el seguidor
type FollowerConfig struct {
Name string `json:"name"`
Address string `json:"address"`
}
FollowerInfo encapsula información sobre un nodo seguidor en el sistema.
type FollowerInfo struct {
Name string // Nombre del seguidor
Address string // Dirección del seguidor en formato "host:puerto"
State FollowerState // Estado actual del seguidor
DateFollower time.Time // Fecha y hora local del seguidor
LocalTime int64 // Hora local del seguidor en milisegundos desde la época UNIX
CommunicationTime int64 // Tiempo de comunicación entre líder y seguidor en milisegundos
TripTime int64 // Tiempo de ida y vuelta (triptime) estimado
DiffTime int64 // Diferencia de tiempo calculada entre líder y seguidor
Delta int64 // Diferencia global de tiempo (delta) aplicada al seguidor
}
func NewFollowerInfo(address, name string, localTime, communicationTime, nowTime int64) *FollowerInfo
NewFollowerInfo crea un nuevo objeto FollowerInfo con los valores proporcionados.
func (f *FollowerInfo) GetAddress() string
GetAddress devuelve la dirección del seguidor.
func (f *FollowerInfo) GetCommunicationTime() int64
GetCommunicationTime devuelve el tiempo de comunicación entre líder y seguidor.
func (f *FollowerInfo) GetDateFollower() time.Time
GetDateFollower devuelve la fecha y hora local del seguidor.
func (f *FollowerInfo) GetDelta() int64
GetDelta devuelve la diferencia global de tiempo (delta).
func (f *FollowerInfo) GetDiffTime() int64
GetDiffTime devuelve la diferencia de tiempo entre el líder y el seguidor.
func (f *FollowerInfo) GetLocalTime() int64
GetLocalTime devuelve la hora local del seguidor.
func (f *FollowerInfo) GetName() string
GetName devuelve el nombre del seguidor.
func (f *FollowerInfo) GetState() FollowerState
GetState devuelve el estado actual del seguidor.
func (f *FollowerInfo) SetDelta(delta int64)
SetDelta establece la diferencia global de tiempo (delta) para el seguidor.
func (f *FollowerInfo) SetState(state FollowerState)
SetState establece el estado del seguidor.
func (f *FollowerInfo) String() string
String genera una representación en cadena del objeto FollowerInfo.
FollowerState representa los posibles estados de un seguidor durante el proceso de actualización de la hora del sistema.
type FollowerState string
const (
// RESPONDED indica que el seguidor ha respondido correctamente a la solicitud del líder.
Responded FollowerState = "RESPONDED"
// NO_RESPONSE indica que el seguidor no ha respondido a la solicitud dentro del tiempo esperado.
NoResponse FollowerState = "NO_RESPONSE"
// CONNECTION_ERROR indica que hubo un error de conexión al intentar comunicarse con el seguidor.
ConnectionError FollowerState = "CONNECTION_ERROR"
// REQUEST_NOT_SENT indica que la solicitud de actualización de la hora no fue enviada debido a algún error.
RequestNotSent FollowerState = "REQUEST_NOT_SENT"
// REQUEST_DELTA_SENT indica que la solicitud de actualización de la hora fue enviada al seguidor para actualizar su hora.
RequestDeltaSent FollowerState = "REQUEST_DELTA_SENT"
// TIME_ERROR_SENT_UPDATE indica que la solicitud de actualización de la hora fue enviada pero hubo un error en ese envío.
TimeErrorSentUpdate FollowerState = "TIME_ERROR_SENT_UPDATE"
// TIME_UPDATED indica que la hora del sistema del seguidor se actualizó correctamente.
TimeUpdated FollowerState = "TIME_UPDATED"
// ERROR_CLOSE indica que no pude enviar el cierre del socket en el seguidor.
ErrorClose FollowerState = "ERROR_CLOSE"
// Ok_CLOSE indica que pude cerrar el socket en el seguidor.
OkClose FollowerState = "Ok_CLOSE"
)
Interfaz Handler con el método HandleProcess
type Handler interface {
HandleProcess(message string) (string, error)
}
INode define las operaciones básicas para un nodo en el sistema.
type INode interface {
InitializeNode(name string, address string, timeout time.Duration) error
InitializeNodeWithAddresses(name string, address string, timeout time.Duration, addresses map[string]string) error
SendMessageSync(address string, message string) (string, error)
SendMessageAsync(address string, message string) error
StartListening() error
StartAlgorithm() error
Close() error
}
Leader representa el nodo líder en el sistema Berkeley.
type Leader struct {
UnreachableFollowers map[string]*FollowerInfo
SuccessfulFollowers map[string]*FollowerInfo
NonRespondingFollowers map[string]*FollowerInfo
TimeUpdatedFollowers map[string]*FollowerInfo
FailedFollowers map[string]*FollowerInfo
//mu sync.Mutex // Mutex para proteger los mapas en accesos concurrentes
Logger *log.Logger
// contains filtered or unexported fields
}
func InitializeLeaderNode(name, address string, timeout time.Duration, nodeAddresses map[string]string) (*Leader, error)
InitializeLeaderNode crea e inicializa un nuevo nodo líder.
func (l *Leader) Close()
func (l *Leader) HandleProcess(message string) (string, error)
HandleProcess implements Handler.
func (l *Leader) StartAlgorithm()
StartAlgorithm implementa el algoritmo de sincronización Berkeley para el líder.
type LeaderConfig struct {
Name string `json:"name"`
Address string `json:"address"`
}
LoggerManager es responsable de gestionar los loggers para las clases que lo invocan.
type LoggerManager struct{}
func (LoggerManager) GetLogger(clazz interface{}) *logrus.Logger
GetLogger obtiene el logger correspondiente a la clase que invoca este método.
SocketZeroMQException estructura que encapsula un error personalizado para ZeroMQ.
type SocketZeroMQException struct {
Message string // Mensaje de error
ErrorType ErrorType // Tipo de error
}
func NewSocketZeroMQException(message string, errorType ErrorType) *SocketZeroMQException
NewSocketZeroMQException crea una nueva instancia de SocketZeroMQException.
func (e *SocketZeroMQException) Error() string
Error implementación de la interfaz error para SocketZeroMQException.
func (e *SocketZeroMQException) GetErrorType() ErrorType
GetErrorType obtiene el tipo de error.
Mensaje JSON que se envía al seguidor.
type TimeRequest struct {
Message string `json:"message"`
Operation string `json:"operation"`
Time int64 `json:"time"`
LeaderAddr string `json:"leader_address"`
}